<!DOCTYPE html>
<html lang="en">

<head>
	<meta charset="UTF-8">
	<meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=no">
	<meta name="keywords" content="Fescar、分布式事务" />
	<meta name="description" content="seata-analysis-simple" />
	<!-- 网页标签标题 -->
	<title>Fescar分布式事务原理解析探秘</title>
  <link rel="shortcut icon" href="/img/seata_logo_small.jpeg"/>
	<link rel="stylesheet" href="/build/blogDetail.css" />
</head>
<body>
	<div id="root"><div class="blog-detail-page" data-reactroot=""><header class="header-container header-container-normal"><div class="header-body"><a href="/zh-cn/index.html"><img class="logo" src="//img.alicdn.com/tfs/TB1gqL1w4D1gK0jSZFyXXciOVXa-1497-401.png"/></a><div class="search search-normal"><span class="icon-search"></span></div><span class="language-switch language-switch-normal">En</span><div class="header-menu"><img class="header-menu-toggle" src="https://img.alicdn.com/tfs/TB14eEmw7P2gK0jSZPxXXacQpXa-38-32.png"/><ul><li class="menu-item menu-item-normal"><a href="/zh-cn/index.html" target="_self">首页</a></li><li class="menu-item menu-item-normal"><a href="/zh-cn/docs/overview/what-is-seata.html" target="_self">文档</a></li><li class="menu-item menu-item-normal"><a href="/zh-cn/docs/developers/developers_dev.html" target="_self">开发者</a></li><li class="menu-item menu-item-normal menu-item-normal-active"><a href="/zh-cn/blog/index.html" target="_self">博客</a></li><li class="menu-item menu-item-normal"><a href="/zh-cn/community/index.html" target="_self">社区</a></li><li class="menu-item menu-item-normal"><a href="/zh-cn/blog/download.html" target="_self">下载</a></li></ul></div></div></header><section class="blog-content markdown-body"><h1>前言</h1>
<p>fescar发布已有时日，分布式事务一直是业界备受关注的领域，fescar发布一个月左右便受到了近5000个star足以说明其热度。当然，在fescar出来之前，
已经有比较成熟的分布式事务的解决方案开源了，比较典型的方案如 <a href="https://github.com/codingapi/tx-lcn">LCN</a> 的2pc型无侵入事务，
目前lcn已发展到5.0，已支持和fescar事务模型类似的TCX型事务。还有如TCC型事务实现 <a href="https://github.com/yu199195/hmily">hmily</a> <a href="https://github.com/changmingxie/tcc-transaction">tcc-transaction</a> 等。
在微服务架构流行的当下、阿里这种开源大户背景下，fescar的发布无疑又掀起了研究分布式事务的热潮。fescar脱胎于阿里云商业分布式事务服务GTS，在线上环境提供这种公共服务其模式肯定经受了非常严苛的考验。其分布式事务模型TXC又仿于传统事务模型XA方案，主要区别在于资源管理器的定位一个在应用层一个在数据库层。博主觉得fescar的txc模型实现非常有研究的价值，所以今天我们来好好翻一翻fescar项目的代码。本文篇幅较长，浏览并理解本文大概耗时30~60分钟左右。</p>
<h1>项目地址</h1>
<p>fescar：<a href="https://github.com/alibaba/fescar">https://github.com/alibaba/fescar</a></p>
<p>本博文所述代码为fescar的0.1.2-SNAPSHOT版本，根据fescar后期的迭代计划，其项目结构和模块实现都可能有很大的改变，特此说明。</p>
<h1>fescar的TXC模型</h1>
<p><img src="/img/blog/c45496461bca15ecd522e497d98ba954f95.jpg" alt=""></p>
<p>上图为fescar官方针对TXC模型制作的示意图。不得不说大厂的图制作的真的不错，结合示意图我们可以看到TXC实现的全貌。TXC的实现通过三个组件来完成。也就是上图的三个深黄色部分，其作用如下：</p>
<ol>
<li>TM：全局事务管理器，在标注开启fescar分布式事务的服务端开启，并将全局事务发送到TC事务控制端管理</li>
<li>TC：事务控制中心，控制全局事务的提交或者回滚。这个组件需要独立部署维护，目前只支持单机版本，后续迭代计划会有集群版本</li>
<li>RM：资源管理器，主要负责分支事务的上报，本地事务的管理</li>
</ol>
<p>一段话简述其实现过程：服务起始方发起全局事务并注册到TC。在调用协同服务时，协同服务的事务分支事务会先完成阶段一的事务提交或回滚，并生成事务回滚的undo_log日志，同时注册当前协同服务到TC并上报其事务状态，归并到同一个业务的全局事务中。此时若没有问题继续下一个协同服务的调用，期间任何协同服务的分支事务回滚，都会通知到TC，TC在通知全局事务包含的所有已完成一阶段提交的分支事务回滚。如果所有分支事务都正常，最后回到全局事务发起方时，也会通知到TC，TC在通知全局事务包含的所有分支删除回滚日志。在这个过程中为了解决写隔离和度隔离的问题会涉及到TC管理的全局锁。</p>
<p>本博文的目标是深入代码细节，探究其基本思路是如何实现的。首先会从项目的结构来简述每个模块的作用，继而结合官方自带的examples实例来探究整个分布式事务的实现过程。</p>
<h1>项目结构解析</h1>
<p>项目拉下来，用IDE打开后的目录结构如下，下面先大致的看下每个模块的实现</p>
<p><img src="/img/blog/a88cf147f489f913886ef1785d94183bf09.jpg" alt=""></p>
<ul>
<li>common ：公共组件，提供常用辅助类，静态变量、扩展机制类加载器、以及定义全局的异常等</li>
<li>config : 配置加载解析模块，提供了配置的基础接口，目前只有文件配置实现，后续会有nacos等配置中心的实现</li>
<li>core : 核心模块主要封装了TM、RM和TC通讯用RPC相关内容</li>
<li>dubbo ：dubbo模块主要适配dubbo通讯框架，使用dubbo的filter机制来传统全局事务的信息到分支</li>
<li>examples ：简单的演示实例模块，等下从这个模块入手探索</li>
<li>rm-datasource :资源管理模块，比较核心的一个模块，个人认为这个模块命名为core要更合理一点。代理了JDBC的一些类，用来解析sql生成回滚日志、协调管理本地事务</li>
<li>server : TC组件所在，主要协调管理全局事务，负责全局事务的提交或者回滚，同时管理维护全局锁。</li>
<li>spring ：和spring集成的模块，主要是aop逻辑，是整个分布式事务的入口，研究fescar的突破口</li>
<li>tm : 全局事务事务管理模块，管理全局事务的边界，全局事务开启回滚点都在这个模块控制</li>
</ul>
<h1>通过【examples】模块的实例看下效果</h1>
<p>第一步、先启动TC也就是【Server】模块，main方法直接启动就好，默认服务端口8091</p>
<p>第二步、回到examples模块，将订单，业务，账户、仓库四个服务的配置文件配置好，主要是mysql数据源和zookeeper连接地址，这里要注意下，默认dubbo的zk注册中心依赖没有，启动的时候回抛找不到class的异常，需要添加如下的依赖：</p>
<pre><code class="language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">dependency</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>com.101tec<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>zkclient<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">version</span>&gt;</span>0.10<span class="hljs-tag">&lt;/<span class="hljs-name">version</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">exclusions</span>&gt;</span>
        <span class="hljs-tag">&lt;<span class="hljs-name">exclusion</span>&gt;</span>
            <span class="hljs-tag">&lt;<span class="hljs-name">artifactId</span>&gt;</span>slf4j-log4j12<span class="hljs-tag">&lt;/<span class="hljs-name">artifactId</span>&gt;</span>
            <span class="hljs-tag">&lt;<span class="hljs-name">groupId</span>&gt;</span>org.slf4j<span class="hljs-tag">&lt;/<span class="hljs-name">groupId</span>&gt;</span>
        <span class="hljs-tag">&lt;/<span class="hljs-name">exclusion</span>&gt;</span>
    <span class="hljs-tag">&lt;/<span class="hljs-name">exclusions</span>&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">dependency</span>&gt;</span>
</code></pre>
<p>第三步、在BusinessServiceImpl中的模拟抛异常的地方打个断点，依次启动OrderServiceImpl、StorageServiceImpl、AccountServiceImpl、BusinessServiceImpl四个服务、等进断点后，查看数据库account_tbl表，金额已减去400元，变成了599元。然后放开断点、BusinessServiceImpl模块模拟的异常触发，全局事务回滚，account_tbl表的金额就又回滚到999元了</p>
<p>如上，我们已经体验到fescar事务的控制能力了，下面我们具体看下它是怎么控制的。</p>
<h1>fescar事务过程分析</h1>
<h2>首先分析配置文件</h2>
<p>这个是一个铁律，任何一个技术或框架要集成，配置文件肯定是一个突破口。从上面的例子我们了解到，实例模块的配置文件中配置了一个全局事务扫描器实例，如：</p>
<pre><code class="language-xml"><span class="hljs-tag">&lt;<span class="hljs-name">bean</span> <span class="hljs-attr">class</span>=<span class="hljs-string">"com.alibaba.fescar.spring.annotation.GlobalTransactionScanner"</span>&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">constructor-arg</span> <span class="hljs-attr">value</span>=<span class="hljs-string">"dubbo-demo-app"</span>/&gt;</span>
    <span class="hljs-tag">&lt;<span class="hljs-name">constructor-arg</span> <span class="hljs-attr">value</span>=<span class="hljs-string">"my\_test\_tx_group"</span>/&gt;</span>
<span class="hljs-tag">&lt;/<span class="hljs-name">bean</span>&gt;</span>
</code></pre>
<p>这个实例在项目启动时会扫描所有实例，具体实现见【spring】模块。并将标注了@GlobalTransactional注解的方法织入GlobalTransactionalInterceptor的invoke方法逻辑。同时应用启动时，会初始化TM（TmRpcClient）和RM（RmRpcClient）的实例，这个时候，服务已经和TC事务控制中心勾搭上了。在往下看就涉及到TM模块的事务模板类TransactionalTemplate。</p>
<h2>【TM】模块启动全局事务</h2>
<p>全局事务的开启，提交、回滚都被封装在TransactionalTemplate中完成了，代码如：</p>
<pre><code class="language-java">
<span class="hljs-function"><span class="hljs-keyword">public</span> Object <span class="hljs-title">execute</span><span class="hljs-params">(TransactionalExecutor business)</span> <span class="hljs-keyword">throws</span> TransactionalExecutor.ExecutionException </span>{
    <span class="hljs-comment">// 1. get or create a transaction</span>
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    <span class="hljs-comment">// 2. begin transaction</span>
    <span class="hljs-keyword">try</span> {
        tx.begin(business.timeout(), business.name());
    } <span class="hljs-keyword">catch</span> (TransactionException txe) {
        <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.BeginFailure);
    }
    Object rs = <span class="hljs-keyword">null</span>;
    <span class="hljs-keyword">try</span> {
        <span class="hljs-comment">// Do Your Business</span>
        rs = business.execute();
    } <span class="hljs-keyword">catch</span> (Throwable ex) {
        <span class="hljs-comment">// 3. any business exception, rollback.</span>
        <span class="hljs-keyword">try</span> {
            tx.rollback();
            <span class="hljs-comment">// 3.1 Successfully rolled back</span>
            <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
        } <span class="hljs-keyword">catch</span> (TransactionException txe) {
            <span class="hljs-comment">// 3.2 Failed to rollback</span>
            <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.RollbackFailure, ex);
        }
    }
    <span class="hljs-comment">// 4. everything is fine, commit.</span>
    <span class="hljs-keyword">try</span> {
        tx.commit();
    } <span class="hljs-keyword">catch</span> (TransactionException txe) {
        <span class="hljs-comment">// 4.1 Failed to commit</span>
        <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.CommitFailure);
    }
    <span class="hljs-keyword">return</span> rs;
}
</code></pre>
<p>更详细的实现在【TM】模块中被分成了两个Class实现，如下：</p>
<p>DefaultGlobalTransaction ：全局事务具体的开启，提交、回滚动作</p>
<p>DefaultTransactionManager ：负责使用TmRpcClient向TC控制中心发送指令，如开启全局事务（GlobalBeginRequest）、提交（GlobalCommitRequest）、回滚（GlobalRollbackRequest）、查询状态（GlobalStatusRequest）等。</p>
<p>以上是TM模块核心内容点，TM模块完成全局事务开启后，接下来就开始看看全局事务iD，xid是如何传递、RM组件是如何介入的</p>
<h2>【dubbo】全局事务xid的传递</h2>
<p>首先是xid的传递，目前已经实现了dubbo框架实现的微服务架构下的传递，其他的像spring cloud和motan等的想要实现也很容易，通过一般RPC通讯框架都有的filter机制，将xid从全局事务的发起节点传递到服务协从节点，从节点接收到后绑定到当前线程上线文环境中，用于在分支事务执行sql时判断是否加入全局事务。fescar的实现见【dubbo】模块如下：</p>
<pre><code class="language-java"><span class="hljs-meta">@Activate</span>(group = { Constants.PROVIDER, Constants.CONSUMER }, order = <span class="hljs-number">100</span>)
<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">TransactionPropagationFilter</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">Filter</span> </span>{

    <span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">final</span> Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

    <span class="hljs-meta">@Override</span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> Result <span class="hljs-title">invoke</span><span class="hljs-params">(Invoker&lt;?&gt; invoker, Invocation invocation)</span> <span class="hljs-keyword">throws</span> RpcException </span>{
        String xid = RootContext.getXID();
        String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
        <span class="hljs-keyword">if</span> (LOGGER.isDebugEnabled()) {
            LOGGER.debug(<span class="hljs-string">"xid in RootContext\["</span> + xid + <span class="hljs-string">"\] xid in RpcContext\["</span> + rpcXid + <span class="hljs-string">"\]"</span>);
        }
        <span class="hljs-keyword">boolean</span> bind = <span class="hljs-keyword">false</span>;
        <span class="hljs-keyword">if</span> (xid != <span class="hljs-keyword">null</span>) {
            RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
        } <span class="hljs-keyword">else</span> {
            <span class="hljs-keyword">if</span> (rpcXid != <span class="hljs-keyword">null</span>) {
                RootContext.bind(rpcXid);
                bind = <span class="hljs-keyword">true</span>;
                <span class="hljs-keyword">if</span> (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(<span class="hljs-string">"bind\["</span> + rpcXid + <span class="hljs-string">"\] to RootContext"</span>);
                }
            }
        }
        <span class="hljs-keyword">try</span> {
            <span class="hljs-keyword">return</span> invoker.invoke(invocation);

        } <span class="hljs-keyword">finally</span> {
            <span class="hljs-keyword">if</span> (bind) {
                String unbindXid = RootContext.unbind();
                <span class="hljs-keyword">if</span> (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(<span class="hljs-string">"unbind\["</span> + unbindXid + <span class="hljs-string">"\] from RootContext"</span>);
                }
                <span class="hljs-keyword">if</span> (!rpcXid.equalsIgnoreCase(unbindXid)) {
                    LOGGER.warn(<span class="hljs-string">"xid in change during RPC from "</span> + rpcXid + <span class="hljs-string">" to "</span> + unbindXid);
                    <span class="hljs-keyword">if</span> (unbindXid != <span class="hljs-keyword">null</span>) {
                        RootContext.bind(unbindXid);
                        LOGGER.warn(<span class="hljs-string">"bind \["</span> + unbindXid + <span class="hljs-string">"\] back to RootContext"</span>);
                    }
                }
            }
        }
    }
}
</code></pre>
<p>上面代码rpcXid不为空时，就加入到了RootContext的ContextCore中，这里稍微深入讲下。ContextCore是一个可扩展实现的接口，目前默认的实现是ThreadLocalContextCore，基于ThreadLocal来保存维护当前的xid。这里fescar提供了可扩展的机制，实现在【common】模块中，通过一个自定义的类加载器EnhancedServiceLoader加载需要扩展的服务类，这样只需要在扩展类加上@LoadLevel注解。标记order属性声明高优先级别，就可以达到扩展实现的目的。</p>
<h2>【RM】模块本地资源管理的介入</h2>
<p>fescar针对本地事务相关的接口，通过代理机制都实现了一遍代理类，如数据源（DataSourceProxy）、ConnectionProxy、StatementProxy等。这个在配置文件中也可以看出来，也就是说，我们要使用fescar分布式事务，一定要配置fescar提供的代理数据源。如：</p>
<p><img src="/img/blog/af317255c71a5c1bf46f7140387acf365f8.jpg" alt=""></p>
<p>配置好代理数据源后，从DataSourceProxy出发，本地针对数据库的所有操作过程我们就可以随意控制了。从上面xid传递，已经知道了xid被保存在RootContext中了，那么请看下面的代码，就非常清楚了：</p>
<p>首先看StatementProxy的一段代码</p>
<p><img src="/img/blog/17896deea47b9aee518812b039c39101d8f.jpg" alt=""></p>
<p>在看ExecuteTemplate中的代码</p>
<p><img src="/img/blog/04488a2745a5d564498462ed64c506174d2.jpg" alt=""></p>
<p>和【TM】模块中的事务管理模板类TransactionlTemplate类似，这里非常关键的逻辑代理也被封装在了ExecuteTemplate模板类中。因重写了Statement有了StatementProxy实现，在执行原JDBC的executeUpdate方法时，会调用到ExecuteTemplate的execute逻辑。在sql真正执行前，会判断RootCOntext当前上下文中是否包含xid，也就是判断当前是否是全局分布式事务。如果不是，就直接使用本地事务，如果是，这里RM就会增加一些分布式事务相关的逻辑了。这里根据sql的不同的类型，fescar封装了五个不同的执行器来处理，分别是UpdateExecutor、DeleteExecutor、InsertExecutor、SelectForUpdateExecutor、PlainExecutor，结构如下图：</p>
<p><img src="/img/blog/bb9a2f07054f19bc21adc332671de4f7b75.jpg" alt=""></p>
<h3>PlainExecutor：</h3>
<p>原生的JDBC接口实现，未做任何处理，提供给全局事务中的普通的select查询使用</p>
<h3>UpdateExecutor、DeleteExecutor、InsertExecutor：</h3>
<p>三个DML增删改执行器实现，主要在sql执行的前后对sql语句进行了解析，实现了如下两个抽象接口方法：</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">protected</span> <span class="hljs-keyword">abstract</span> TableRecords <span class="hljs-title">beforeImage</span><span class="hljs-params">()</span> <span class="hljs-keyword">throws</span> SQLException</span>;

<span class="hljs-function"><span class="hljs-keyword">protected</span> <span class="hljs-keyword">abstract</span> TableRecords <span class="hljs-title">afterImage</span><span class="hljs-params">(TableRecords beforeImage)</span> <span class="hljs-keyword">throws</span> SQLException</span>;
</code></pre>
<p>在这个过程中通过解析sql生成了提供回滚操作的undo_log日志，日志目前是保存在msyql中的，和业务sql操作共用同一个事务。表的结构如下：</p>
<p><img src="/img/blog/fd5c423815d3b84bdbc70d7efeb9cd16757.jpg" alt=""></p>
<p>rollback_info保存的undo_log详细信息，是longblob类型的，结构如下：</p>
<pre><code class="language-json">{
    <span class="hljs-attr">"branchId"</span>:<span class="hljs-number">3958194</span>,
    <span class="hljs-attr">"sqlUndoLogs"</span>:[
        {
            <span class="hljs-attr">"afterImage"</span>:{
                <span class="hljs-attr">"rows"</span>:[
                    {
                        <span class="hljs-attr">"fields"</span>:[
                            {
                                <span class="hljs-attr">"keyType"</span>:<span class="hljs-string">"PrimaryKey"</span>,
                                <span class="hljs-attr">"name"</span>:<span class="hljs-string">"ID"</span>,
                                <span class="hljs-attr">"type"</span>:<span class="hljs-number">4</span>,
                                <span class="hljs-attr">"value"</span>:<span class="hljs-number">10</span>
                            },
                            {
                                <span class="hljs-attr">"keyType"</span>:<span class="hljs-string">"NULL"</span>,
                                <span class="hljs-attr">"name"</span>:<span class="hljs-string">"COUNT"</span>,
                                <span class="hljs-attr">"type"</span>:<span class="hljs-number">4</span>,
                                <span class="hljs-attr">"value"</span>:<span class="hljs-number">98</span>
                            }
                        ]
                    }
                ],
                <span class="hljs-attr">"tableName"</span>:<span class="hljs-string">"storage_tbl"</span>
            },
            <span class="hljs-attr">"beforeImage"</span>:{
                <span class="hljs-attr">"rows"</span>:[
                    {
                        <span class="hljs-attr">"fields"</span>:[
                            {
                                <span class="hljs-attr">"keyType"</span>:<span class="hljs-string">"PrimaryKey"</span>,
                                <span class="hljs-attr">"name"</span>:<span class="hljs-string">"ID"</span>,
                                <span class="hljs-attr">"type"</span>:<span class="hljs-number">4</span>,
                                <span class="hljs-attr">"value"</span>:<span class="hljs-number">10</span>
                            },
                            {
                                <span class="hljs-attr">"keyType"</span>:<span class="hljs-string">"NULL"</span>,
                                <span class="hljs-attr">"name"</span>:<span class="hljs-string">"COUNT"</span>,
                                <span class="hljs-attr">"type"</span>:<span class="hljs-number">4</span>,
                                <span class="hljs-attr">"value"</span>:<span class="hljs-number">100</span>
                            }
                        ]
                    }
                ],
                <span class="hljs-attr">"tableName"</span>:<span class="hljs-string">"storage_tbl"</span>
            },
            <span class="hljs-attr">"sqlType"</span>:<span class="hljs-string">"UPDATE"</span>,
            <span class="hljs-attr">"tableName"</span>:<span class="hljs-string">"storage_tbl"</span>
        }
    ],
    <span class="hljs-attr">"xid"</span>:<span class="hljs-string">"192.168.7.77:8091:3958193"</span>
}


</code></pre>
<p>这里贴的是一个update的操作，undo_log记录的非常的详细，通过全局事务xid关联branchid，记录数据操作的表名，操作字段名，以及sql执行前后的记录数，如这个记录，表名=storage_tbl,sql执行前ID=10，count=100，sql执行后id=10，count=98。如果整个全局事务失败，需要回滚的时候就可以生成：</p>
<pre><code class="language-sql"><span class="hljs-keyword">update</span> storage_tbl <span class="hljs-keyword">set</span> <span class="hljs-keyword">count</span> = <span class="hljs-number">100</span> <span class="hljs-keyword">where</span> <span class="hljs-keyword">id</span> = <span class="hljs-number">10</span>;
</code></pre>
<p>这样的回滚sql语句执行了。</p>
<h3>SelectForUpdateExecutor：</h3>
<p>fescar的AT模式在本地事务之上默认支持读未提交的隔离级别，但是通过SelectForUpdateExecutor执行器，可以支持读已提交的隔离级别。代码如：</p>
<pre><code class="language-java"><span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> Object <span class="hljs-title">doExecute</span><span class="hljs-params">(Object... args)</span> <span class="hljs-keyword">throws</span> Throwable </span>{
    SQLSelectRecognizer recognizer = (SQLSelectRecognizer) sqlRecognizer;

    Connection conn = statementProxy.getConnection();
    ResultSet rs = <span class="hljs-keyword">null</span>;
    Savepoint sp = <span class="hljs-keyword">null</span>;
    LockRetryController lockRetryController = <span class="hljs-keyword">new</span> LockRetryController();
    <span class="hljs-keyword">boolean</span> originalAutoCommit = conn.getAutoCommit();

    StringBuffer selectSQLAppender = <span class="hljs-keyword">new</span> StringBuffer(<span class="hljs-string">"SELECT "</span>);
    selectSQLAppender.append(getTableMeta().getPkName());
    selectSQLAppender.append(<span class="hljs-string">" FROM "</span> + getTableMeta().getTableName());
    String whereCondition = <span class="hljs-keyword">null</span>;
    ArrayList&lt;Object&gt; paramAppender = <span class="hljs-keyword">new</span> ArrayList&lt;&gt;();
    <span class="hljs-keyword">if</span> (statementProxy <span class="hljs-keyword">instanceof</span> ParametersHolder) {
        whereCondition = recognizer.getWhereCondition((ParametersHolder) statementProxy, paramAppender);
    } <span class="hljs-keyword">else</span> {
        whereCondition = recognizer.getWhereCondition();
    }
    <span class="hljs-keyword">if</span> (!StringUtils.isEmpty(whereCondition)) {
        selectSQLAppender.append(<span class="hljs-string">" WHERE "</span> + whereCondition);
    }
    selectSQLAppender.append(<span class="hljs-string">" FOR UPDATE"</span>);
    String selectPKSQL = selectSQLAppender.toString();

    <span class="hljs-keyword">try</span> {
        <span class="hljs-keyword">if</span> (originalAutoCommit) {
            conn.setAutoCommit(<span class="hljs-keyword">false</span>);
        }
        sp = conn.setSavepoint();
        rs = statementCallback.execute(statementProxy.getTargetStatement(), args);

        <span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
            <span class="hljs-comment">// Try to get global lock of those rows selected</span>
            Statement stPK = <span class="hljs-keyword">null</span>;
            PreparedStatement pstPK = <span class="hljs-keyword">null</span>;
            ResultSet rsPK = <span class="hljs-keyword">null</span>;
            <span class="hljs-keyword">try</span> {
                <span class="hljs-keyword">if</span> (paramAppender.isEmpty()) {
                    stPK = statementProxy.getConnection().createStatement();
                    rsPK = stPK.executeQuery(selectPKSQL);
                } <span class="hljs-keyword">else</span> {
                    pstPK = statementProxy.getConnection().prepareStatement(selectPKSQL);
                    <span class="hljs-keyword">for</span> (<span class="hljs-keyword">int</span> i = <span class="hljs-number">0</span>; i &lt; paramAppender.size(); i++) {
                        pstPK.setObject(i + <span class="hljs-number">1</span>, paramAppender.get(i));
                    }
                    rsPK = pstPK.executeQuery();
                }

                TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
                statementProxy.getConnectionProxy().checkLock(selectPKRows);
                <span class="hljs-keyword">break</span>;

            } <span class="hljs-keyword">catch</span> (LockConflictException lce) {
                conn.rollback(sp);
                lockRetryController.sleep(lce);

            } <span class="hljs-keyword">finally</span> {
                <span class="hljs-keyword">if</span> (rsPK != <span class="hljs-keyword">null</span>) {
                    rsPK.close();
                }
                <span class="hljs-keyword">if</span> (stPK != <span class="hljs-keyword">null</span>) {
                    stPK.close();
                }
                <span class="hljs-keyword">if</span> (pstPK != <span class="hljs-keyword">null</span>) {
                    pstPK.close();
                }
            }
        }

    } <span class="hljs-keyword">finally</span> {
        <span class="hljs-keyword">if</span> (sp != <span class="hljs-keyword">null</span>) {
            conn.releaseSavepoint(sp);
        }
        <span class="hljs-keyword">if</span> (originalAutoCommit) {
            conn.setAutoCommit(<span class="hljs-keyword">true</span>);
        }
    }
    <span class="hljs-keyword">return</span> rs;
}
</code></pre>
<p>关键代码见：</p>
<pre><code class="language-java">TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
statementProxy.getConnectionProxy().checkLock(selectPKRows);
</code></pre>
<p>通过selectPKRows表操作记录拿到lockKeys，然后到TC控制器端查询是否被全局锁定了，如果被锁定了，就重新尝试，直到锁释放返回查询结果。</p>
<h2>分支事务的注册和上报</h2>
<p>在本地事务提交前，fescar会注册和上报分支事务相关的信息，见ConnectionProxy类的commit部分代码：</p>
<pre><code class="language-java"><span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">commit</span><span class="hljs-params">()</span> <span class="hljs-keyword">throws</span> SQLException </span>{
    <span class="hljs-keyword">if</span> (context.inGlobalTransaction()) {
        <span class="hljs-keyword">try</span> {
            register();
        } <span class="hljs-keyword">catch</span> (TransactionException e) {
            recognizeLockKeyConflictException(e);
        }

        <span class="hljs-keyword">try</span> {
            <span class="hljs-keyword">if</span> (context.hasUndoLog()) { 
                UndoLogManager.flushUndoLogs(<span class="hljs-keyword">this</span>);
            }
            targetConnection.commit();
        } <span class="hljs-keyword">catch</span> (Throwable ex) {
            report(<span class="hljs-keyword">false</span>);
            <span class="hljs-keyword">if</span> (ex <span class="hljs-keyword">instanceof</span> SQLException) {
                <span class="hljs-keyword">throw</span> (SQLException) ex;
            } <span class="hljs-keyword">else</span> {
                <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> SQLException(ex);
            }
        }
        report(<span class="hljs-keyword">true</span>);
        context.reset();
       
    } <span class="hljs-keyword">else</span> {
        targetConnection.commit();
    }
}
</code></pre>
<p>从这段代码我们可以看到，首先是判断是了是否是全局事务，如果不是，就直接提交了，如果是，就先向TC控制器注册分支事务，为了写隔离，在TC端会涉及到全局锁的获取。然后保存了用于回滚操作的undo_log日志，继而真正提交本地事务，最后向TC控制器上报事务状态。此时，阶段一的本地事务已完成了。</p>
<h2>【server】模块协调全局</h2>
<p>关于server模块，我们可以聚焦在DefaultCoordinator这个类，这个是AbstractTCInboundHandler控制处理器默认实现。主要实现了全局事务开启，提交，回滚，状态查询，分支事务注册，上报，锁检查等接口，如：</p>
<p><img src="/img/blog/3da6fd82debb9470eb4a5feb1eecac6d6a2.jpg" alt=""></p>
<p>回到一开始的TransactionlTemplate，如果整个分布式事务失败需要回滚了，首先是TM向TC发起回滚的指令，然后TC接收到后，解析请求后会被路由到默认控制器类的doGlobalRollback方法内，最终在TC控制器端执行的代码如下：</p>
<pre><code class="language-java"><span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">doGlobalRollback</span><span class="hljs-params">(GlobalSession globalSession, <span class="hljs-keyword">boolean</span> retrying)</span> <span class="hljs-keyword">throws</span> TransactionException </span>{
    <span class="hljs-keyword">for</span> (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
        BranchStatus currentBranchStatus = branchSession.getStatus();
        <span class="hljs-keyword">if</span> (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
            <span class="hljs-keyword">continue</span>;
        }
        <span class="hljs-keyword">try</span> {
            BranchStatus branchStatus = resourceManagerInbound.branchRollback(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
                    branchSession.getResourceId(), branchSession.getApplicationData());

            <span class="hljs-keyword">switch</span> (branchStatus) {
                <span class="hljs-keyword">case</span> PhaseTwo_Rollbacked:
                    globalSession.removeBranch(branchSession);
                    LOGGER.error(<span class="hljs-string">"Successfully rolled back branch "</span> + branchSession);
                    <span class="hljs-keyword">continue</span>;
                <span class="hljs-keyword">case</span> PhaseTwo\_RollbackFailed\_Unretryable:
                    GlobalStatus currentStatus = globalSession.getStatus();
                    <span class="hljs-keyword">if</span> (currentStatus.name().startsWith(<span class="hljs-string">"Timeout"</span>)) {
                        globalSession.changeStatus(GlobalStatus.TimeoutRollbackFailed);
                    } <span class="hljs-keyword">else</span> {
                        globalSession.changeStatus(GlobalStatus.RollbackFailed);
                    }
                    globalSession.end();
                    LOGGER.error(<span class="hljs-string">"Failed to rollback global\["</span> + globalSession.getTransactionId() + <span class="hljs-string">"\] since branch\["</span> + branchSession.getBranchId() + <span class="hljs-string">"\] rollback failed"</span>);
                    <span class="hljs-keyword">return</span>;
                <span class="hljs-keyword">default</span>:
                    LOGGER.info(<span class="hljs-string">"Failed to rollback branch "</span> + branchSession);
                    <span class="hljs-keyword">if</span> (!retrying) {
                        queueToRetryRollback(globalSession);
                    }
                    <span class="hljs-keyword">return</span>;

            }
        } <span class="hljs-keyword">catch</span> (Exception ex) {
            LOGGER.info(<span class="hljs-string">"Exception rollbacking branch "</span> + branchSession, ex);
            <span class="hljs-keyword">if</span> (!retrying) {
                queueToRetryRollback(globalSession);
                <span class="hljs-keyword">if</span> (ex <span class="hljs-keyword">instanceof</span> TransactionException) {
                    <span class="hljs-keyword">throw</span> (TransactionException) ex;
                } <span class="hljs-keyword">else</span> {
                    <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> TransactionException(ex);
                }
            }

        }

    }
    GlobalStatus currentStatus = globalSession.getStatus();
    <span class="hljs-keyword">if</span> (currentStatus.name().startsWith(<span class="hljs-string">"Timeout"</span>)) {
        globalSession.changeStatus(GlobalStatus.TimeoutRollbacked);
    } <span class="hljs-keyword">else</span> {
        globalSession.changeStatus(GlobalStatus.Rollbacked);
    }
    globalSession.end();
}
</code></pre>
<p>如上代码可以看到，回滚时从全局事务会话中迭代每个分支事务，然后通知每个分支事务回滚。分支服务接收到请求后，首先会被路由到RMHandlerAT中的doBranchRollback方法，继而调用了RM中的branchRollback方法，代码如下：</p>
<pre><code class="language-java"><span class="hljs-meta">@Override</span>
<span class="hljs-function"><span class="hljs-keyword">public</span> BranchStatus <span class="hljs-title">branchRollback</span><span class="hljs-params">(String xid, <span class="hljs-keyword">long</span> branchId, String resourceId, String applicationData)</span> <span class="hljs-keyword">throws</span> TransactionException </span>{
    DataSourceProxy dataSourceProxy = get(resourceId);
    <span class="hljs-keyword">if</span> (dataSourceProxy == <span class="hljs-keyword">null</span>) {
        <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> ShouldNeverHappenException();
    }
    <span class="hljs-keyword">try</span> {
        UndoLogManager.undo(dataSourceProxy, xid, branchId);
    } <span class="hljs-keyword">catch</span> (TransactionException te) {
        <span class="hljs-keyword">if</span> (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            <span class="hljs-keyword">return</span> BranchStatus.PhaseTwo\_RollbackFailed\_Unretryable;
        } <span class="hljs-keyword">else</span> {
            <span class="hljs-keyword">return</span> BranchStatus.PhaseTwo\_RollbackFailed\_Retryable;
        }
    }
    <span class="hljs-keyword">return</span> BranchStatus.PhaseTwo_Rollbacked;
}
</code></pre>
<p>RM分支事务端最后执行的是UndoLogManager的undo方法，通过xid和branchid从数据库查询出回滚日志，完成数据回滚操作，整个过程都是同步完成的。如果全局事务是成功的，TC也会有类似的上述协调过程，只不过是异步的将本次全局事务相关的undo_log清除了而已。至此，就完成了2阶段的提交或回滚，也就完成了完整的全局事务事务的控制。</p>
<h1>结语</h1>
<p>如果你看到这里，那么非常感谢你，在繁忙工作之余耐心的花时间来学习。同时，我相信花的时间没白费，完整的浏览理解估计对fescar实现的大致流程了解的十之八九了。本文从构思立题到完成大概耗时1人天左右，博主在这个过程中，对fescar的实现也有了更加深入的了解。由于篇幅原因，并没有面面俱到的对每个实现的细节去深究，如sql是如何解析的等，更多的是在fescar的TXC模型的实现过程的关键点做了详细阐述。本文已校对，但由于个人知识水平及精力有限，文中不免出现错误或理解不当的地方，欢迎指正。</p>
<h3>作者简介：</h3>
<p>陈凯玲，2016年5月加入凯京科技。曾任职高级研发和项目经理，现任凯京科技研发中心架构&amp;运维部负责人。pmp项目管理认证，阿里云MVP。热爱开源，先后开源过多个热门项目。热爱分享技术点滴，独立博客KL博客（<a href="http://www.kailing.pub/">http://www.kailing.pub</a>）博主。</p>
</section><footer class="footer-container"><div class="footer-body"><img src="//img.alicdn.com/tfs/TB1dGrSwVT7gK0jSZFpXXaTkpXa-4802-1285.png"/><p class="docsite-power">website powered by docsite</p><div class="cols-container"><div class="col col-12"><h3>愿景</h3><p>Seata 是一款阿里巴巴开源的分布式事务解决方案，致力于在微服务架构下提供高性能和简单易用的分布式事务服务。</p></div><div class="col col-6"><dl><dt>文档</dt><dd><a href="/zh-cn/docs/overview/what-is-seata.html" target="_self">Seata 是什么？</a></dd><dd><a href="/zh-cn/docs/user/quickstart.html" target="_self">快速开始</a></dd><dd><a href="https://github.com/seata/seata.github.io/issues/new" target="_self">报告文档问题</a></dd><dd><a href="https://github.com/seata/seata.github.io" target="_self">在Github上编辑此文档</a></dd></dl></div><div class="col col-6"><dl><dt>资源</dt><dd><a href="/zh-cn/blog/index.html" target="_self">博客</a></dd><dd><a href="/zh-cn/community/index.html" target="_self">社区</a></dd></dl></div></div><div class="copyright"><span>Copyright © 2019 Seata</span></div></div></footer></div></div>
	<script src="https://f.alicdn.com/react/15.4.1/react-with-addons.min.js"></script>
	<script src="https://f.alicdn.com/react/15.4.1/react-dom.min.js"></script>
	<script>
		window.rootPath = '';
  </script>
	<script src="/build/blogDetail.js"></script>
	<script>
    var _hmt = _hmt || [];
    (function() {
      var hm = document.createElement("script");
      hm.src = "https://hm.baidu.com/hm.js?104e73ef0c18b416b27abb23757ed8ee";
      var s = document.getElementsByTagName("script")[0];
      s.parentNode.insertBefore(hm, s);
    })();
    </script>
</body>
</html>
